package com.broker.utils.strorage.support;

import com.broker.base.IBrokerStorage;
import com.broker.base.StorageException;
import com.broker.utils.strorage.support.redis.BrokerJedis;
import com.broker.utils.strorage.support.redis.RedisConfig;
import org.jetbrains.annotations.Nullable;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
 * @author kong <androidsimu@163.com>
 * create by 2019/3/12 9:29
 * Description: echatimbroker
 **/
public class RedisStorage implements IBrokerStorage {
    private BrokerJedis brokerJedis = null;

    public RedisStorage(RedisConfig config){
        brokerJedis = new BrokerJedis(config);
    }

    @Override
    public void setKeyValue(String obj, String key, String value) throws StorageException {
        synchronized (obj.intern()){
            brokerJedis.setHashKV(obj, key, value);
        }
    }

    @Override
    public void setKeyValue(String obj, Map<String, String> keyValues) throws StorageException {
        synchronized (obj.intern()){
            keyValues.forEach((k,v)->{
                brokerJedis.setHashKV(obj, k, v);
            });
        }
    }

    @Nullable
    @Override
    public String getValue(String obj, String key) throws StorageException {
        String value =  brokerJedis.getHashKV(obj, key);
        return value;
    }


    @Nullable
    @Override
    public String getValue(String obj, String key, Supplier<String> fetchFunc) throws StorageException {
        String value =  brokerJedis.getHashKV(obj, key);
        if(value == null){
            String newValue = fetchFunc.get();
            if(newValue != null){
                brokerJedis.setHashKV(obj, key, newValue);
            }
            return newValue;
        }
        return value;
    }


    @Override
    public Map<String, String> getValue(String obj, Set<String> keys) throws StorageException {
        Map<String, String> maps = new HashMap<>();
        keys.forEach(k->{
            String value = brokerJedis.getHashKV(obj, k);
            if(value != null){
                maps.put(k, value);
            }
        });
        return maps;
    }

    @Override
    public Map<String, String> getAllKeyValues(String obj) throws StorageException {
        Set<String> keys = brokerJedis.getHashAllKey(obj);
        Map<String, String> maps = new HashMap<>();
        keys.forEach(k->{
            String value = brokerJedis.getHashKV(obj, k);
            if(value != null){
                maps.put(k, brokerJedis.getHashKV(obj, k));
            }
        });
        return maps;
    }

    @Override
    public void remove(String obj, String key) throws StorageException {
        synchronized (obj.intern()){
            brokerJedis.delHashKV(obj, key);
        }
    }


    @Override
    public void globalSynchronized(String key, Consumer<String> handler) throws StorageException{
        while (true){
            Long result = brokerJedis.setKVNX(key, key);
            if(result == 1){
                handler.accept(key);
                brokerJedis.delKV(key);
                break;
            }
            else if(result == -1){
                throw new StorageException("Connected to redis server failed.");
            }
            else if(result == 0){
                try {
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}
